-
Notifications
You must be signed in to change notification settings - Fork 28.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-28351][SQL] Support DELETE in DataSource V2 #25115
Conversation
ok to test |
@xianyinxin thanks for working on it. A few general questions:
also cc @brkyvz @jose-torres |
I have no idea what is the meaning of "maintenance" here. Could you elaborate a bit? UPDATE and DELETE are just DMLs. Sorry for the dumb question if it's just obvious one for others as well. |
I have to agree with the maintenance thing. I get that it's de-acronymizing DML (although I think technically the M is supposed to be "manipulation"), but it's really confusing to draw a distinction between writes and other types of DML. If DELETE can't be one of the string-based capabilities, I'm not sure SupportsWrite makes sense as an interface. |
Test build #107538 has finished for PR 25115 at commit
|
@xianyinxin, thanks for working on this. Is there a design doc to go with the interfaces you're proposing? |
Thanks @cloud-fan .
|
Thank you for the comments @HeartSaVioR . Maybe maintenance is not a good word here. Why I propose to introduce a maintenance interface is that it's hard to embed the UPDATE/DELETE, or UPSERTS or MERGE to the current
|
Thank you for the comments @jose-torres . I'm not sure if i get you, pls correct me if I'm wrong. "maintenance" is not the M in DML, even though the maintenance thing and write are all DMLs. Why I separate "maintenance" from |
Thank you for the comments @rdblue . Sorry I don't have a design doc, as for the complicated case like MERGE we didn't make the work flow clear. I can prepare one but it must be with much uncertainty. BTW, do you have some idea or suggestion on this? |
sql/catalyst/src/main/scala/org/apache/spark/sql/sources/filters.scala
Outdated
Show resolved
Hide resolved
Test build #107680 has finished for PR 25115 at commit
|
@xianyinxin, I think we should consider what kind of delete support you're proposing to add, and whether we need to add a new builder pattern. I don't think that we need one for
I think we may need a builder for more complex row-level deletes, but if the intent here is to pass filters to a data source and delete if those filters are supported, then we can add a more direct trait to the table, Alternatively, we could support deletes using If either of those approaches would work, then we don't need to add a new builder or make decisions that would affect the future design of What do you think? Would you like to discuss this in the next DSv2 sync in a week? I can add this to the topics. |
Thank you @rdblue , pls see the inline comments.
My thoughts is to provide a DELETE support in DSV2, but a general solution maybe a little complicated. We considered delete_by_filter and also delete_by_row, both have pros and cons. Delete_by_filter is simple, and more effcient, while delete_by_row is more powerful but needs careful design at V2 API spark side. So I think we
As you pointed, and metioned above, if we want to provide a general DELETE support, or a future consideration of MERGE INTO or UPSERTS, delete via
Aggree. And I had a off-line discussion with @cloud-fan. We discussed the
What do you think about the hybrid solution? Is that reasonable?
Thank you very much, Ryan. I'd like to attend the sync next week, pls add me in the mail thread and add this topic. |
Delete by expression is a much simpler case than row-level deletes, upserts, and merge into. Since the goal of this PR is to implement delete by expression, I suggest focusing on that so we can get it in. If you want to built the general solution for merge into, upsert, and row-level delete, that's a much longer design process. Since this doesn't require that process, let's separate the two. To do that, I think we should add
My proposal was to use
I see no reason for a hybrid solution. Filter deletes are a simpler case and can be supported separately. When filters match expectations (e.g., partition filters for Hive, any filter for JDBC) then the source can use them. Otherwise filters can be rejected and Spark can fall back to row-level deletes, if those are supported. I don't see a reason to block filter-based deletes because those are not going to be the same thing as row-level deletes. |
I vote for There is a similar PR opened a long time ago: #21308 . Maybe we can borrow the doc/comments from it? cc @xianyinxin |
Hi @cloud-fan @rdblue , I refactored the code according to your suggestions. Now |
Test build #108322 has finished for PR 25115 at commit
|
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
Show resolved
Hide resolved
sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/SupportsDelete.java
Show resolved
Hide resolved
protected def findReferences(value: Any): Array[String] = value match { | ||
case f: Filter => f.references | ||
case _ => Array.empty | ||
} | ||
|
||
protected def quoteIdentifier(name: String): String = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use CatalogV2Implicits
to get the quoted
method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code is borrowed from org.apache.spark.sql.catalyst.util.quoteIdentifier
which is a package util, while CatalogV2Implicits.quoted
is not a public util function. We'd better unify the two, I think.
However, this code is introduced by the needs in the delete test case. Now the test code is updated according to your suggestion below, which left this function (sources.filter.sql) unused. I have removed this function in the latest code. If we need this function in future (like translating filters to sql string in jdbc), we then submit a new pr.
} | ||
val filterStr = | ||
filters.map { | ||
filter => filter.sql |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is over-complicated to add a conversion from Filter to a SQL string just so this can parse that filter back into an Expression. I'd prefer a conversion back from Filter to Expression, but I don't think either one is needed.
The overwrite support can run equality filters, which is enough for matching partition keys. I recommend using that and supporting only partition-level deletes in test tables. That way, the table also rejects some delete expressions that are not on partition columns and we can add tests that validate Spark's behavior for those cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. Ideally the real implementation should build its own filter evaluator, instead of using Spark Expression
. See ParquetFilters
as an example.
We don't need a complete implementation in the test. The idea of only supporting equality filters and partition keys sounds pretty good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @rdblue @cloud-fan . I've updated the code according to your suggestions.
Test build #108329 has finished for PR 25115 at commit
|
Test build #108512 has finished for PR 25115 at commit
|
condition: Expression) extends Command { | ||
|
||
override def children: Seq[LogicalPlan] = child :: Nil | ||
override def output: Seq[Attribute] = Seq.empty |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this a noop override?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed the noop override.
|
||
override def output: Seq[Attribute] = Seq.empty | ||
|
||
override def children: Seq[LogicalPlan] = Seq.empty |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto, noop override
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
condition: Expression) | ||
extends ParsedStatement { | ||
|
||
override def output: Seq[Attribute] = Seq.empty |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto, noop override
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@@ -309,6 +322,15 @@ case class DataSourceResolution( | |||
orCreate = replace.orCreate) | |||
} | |||
|
|||
private def convertDeleteFrom( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can inline it. It's short and used only once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
sql(s"CREATE TABLE $t (id bigint, data string, p int) USING foo PARTITIONED BY (id, p)") | ||
sql(s"INSERT INTO $t VALUES (2L, 'a', 2), (2L, 'b', 3), (3L, 'c', 3)") | ||
val exc = intercept[AnalysisException] { | ||
sql(s"DELETE FROM $t WHERE id IN (SELECT id FROM $t)") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we also test correlated subquery?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is that necessary to test correlated subquery? Because correlated subquery is a subset of subquery and we forbid subquery here, then correlated subquery is also forbidden.
My thought is later I want to add pre-execution subquery for DELETE, but correlated subquery is still forbidden, so we can modify the test cases at that time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sounds reasonable to me.
Test build #108872 has finished for PR 25115 at commit
|
@@ -173,6 +173,19 @@ case class DataSourceResolution( | |||
// only top-level adds are supported using AlterTableAddColumnsCommand | |||
AlterTableAddColumnsCommand(table, newColumns.map(convertToStructField)) | |||
|
|||
case DeleteFromStatement(AsTableIdentifier(table), tableAlias, condition) => | |||
throw new AnalysisException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this always throws AnalysisException
, I think this case should be removed. Instead, the next case should match and the V2SessionCatalog
should be used. If the table loaded by the v2 session catalog doesn't support delete, then conversion to physical plan will fail when asDeletable
is called.
Then users can still call v2 deletes for formats like parquet
that have a v2 implementation that will work.
FYI @brkyvz.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @rdblue . Removed this case and fallback to sessionCatalog
when resolveTables
for DeleteFromTable
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's worse to move this case from here to https://github.com/apache/spark/pull/25115/files#diff-57b3d87be744b7d79a9beacf8e5e5eb2R657 .
If we can't merge these 2 cases into one here, let's keep it as it was.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand correctly, one purpose of removing the first case is we can execute delete on parquet
format via this API (if we implement it later) as @rdblue mentioned. The key point here is we resolve the table use V2SessionCatalog
as the fallback catalog. The original resolveTable
doesn't give any fallback-to-sessionCatalog mechanism (if no catalog found, it will fallback to resolveRelation
). So maybe we can modify resolveTable
and let it treat V2SessionCatalog
as a try option:
case u @ UnresolvedRelation(CatalogObjectIdentifier(maybeCatalog, ident)) =>
maybeCatalog.orElse(sessionCatalog) match {
case Some(catalogPlugin) =>
loadTable(catalogPlugin, ident).map(DataSourceV2Relation.create).getOrElse(u)
case None =>
u
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need to update ResolveTables
, though I do see that it would be nice to use ResolveTables
as the only rule that resolves UnresolvedRelation
for v2 tables.
There is already another rule that loads tables from a catalog, ResolveInsertInto
.
I considered updating that rule and moving the table resolution part into ResolveTables
as well, but I think it is a little cleaner to resolve the table when converting the statement (in DataSourceResolution
), as @cloud-fan is suggesting.
One of the reasons to do this for the insert plans is that those plans don't include the target relation as a child. Instead, those plans have the data to insert as a child node, which means that the unresolved relation won't be visible to the ResolveTables
rule.
Taking the same approach in this PR would also make this a little cleaner. If DeleteFrom
didn't expose the relation as a child, it could be a UnaryNode
and you wouldn't need to update some of the other rules to explicitly include DeleteFrom
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I rolled back the resolve rules for DeleteFromTable
as it was as @cloud-fan suggested. For cases that like deleting from formats or V2SessionCatalog
support, let's open another pr. And another pr for resolve rules is also need because I found other issues related with that. Does this sounds reasonable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can remove this case after #25402, which updates ResolveTable
to fallback to v2 session catalog.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Saw the code in #25402 . I think it's the best choice.
identifier: Identifier, | ||
delete: DeleteFromStatement): DeleteFromTable = { | ||
val relation = UnresolvedRelation(delete.tableName) | ||
val aliased = delete.tableAlias.map { SubqueryAlias(_, relation) }.getOrElse(relation) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: one-line map expressions should use (...)
instead of {...}
, like this:
delete.tableAlias.map(SubqueryAlias(_, relation)).getOrElse(relation)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
This looks really close to being ready to me. Thanks for fixing the Filter problem! |
Test build #109021 has finished for PR 25115 at commit
|
retest this please |
LGTM except #25115 (comment) |
Test build #109038 has finished for PR 25115 at commit
|
Test build #109072 has finished for PR 25115 at commit
|
It seems the failure pyspark test has nothing to do with this pr. |
retest this please |
Test build #109089 has finished for PR 25115 at commit
|
retest this please |
1 similar comment
retest this please |
Test build #109105 has finished for PR 25115 at commit
|
LGTM, merging to master! |
Thank you @cloud-fan @rdblue for reviewing. |
What changes were proposed in this pull request?
This pr adds DELETE support for V2 datasources. As a first step, this pr only support delete by source filters:
void delete(Filter[] filters);
which could not deal with complicated cases like subqueries.
Since it's uncomfortable to embed the implementation of DELETE in the current V2 APIs, a new mix-in of datasource is added, which is called
SupportsMaintenance
, similar toSupportsRead
andSupportsWrite
. A datasource which can be maintained means we can perform DELETE/UPDATE/MERGE/OPTIMIZE on the datasource, as long as the datasource implements the necessary mix-ins.How was this patch tested?
new test case.
Please review https://spark.apache.org/contributing.html before opening a pull request.